/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package kafka.security.authorizer

import java.{lang, util}
import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.typesafe.scalalogging.Logger
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.security.authorizer.AclEntry.RESOURCE_SEPARATOR
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1
import org.apache.kafka.server.config.ZkConfigs
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}

object AclAuthorizer {
  // Optional override zookeeper cluster configuration where acls will be stored. If not specified,
  // acls will be stored in the same zookeeper where all other kafka broker metadata is stored.
  val configPrefix: String = "authorizer."
  private val ZkUrlProp = s"${configPrefix}zookeeper.url"
  private val ZkConnectionTimeOutProp = s"${configPrefix}zookeeper.connection.timeout.ms"
  private val ZkSessionTimeOutProp = s"${configPrefix}zookeeper.session.timeout.ms"
  private val ZkMaxInFlightRequests = s"${configPrefix}zookeeper.max.in.flight.requests"

  // Semi-colon separated list of users that will be treated as super users and will have access to all the resources
  // for all actions from all hosts, defaults to no super users.
  val SuperUsersProp: String = "super.users"
  // If set to true when no acls are found for a resource, authorizer allows access to everyone. Defaults to false.
  val AllowEveryoneIfNoAclIsFoundProp: String = "allow.everyone.if.no.acl.found"

  case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) {
    def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
  }

  private class AclSeqs(seqs: Seq[AclEntry]*) {
    def find(p: AclEntry => Boolean): Option[AclEntry] = {
      // Lazily iterate through the inner `Seq` elements and stop as soon as we find a match
      val it = seqs.iterator.flatMap(_.find(p))
      if (it.hasNext) Some(it.next())
      else None
    }

    def isEmpty: Boolean = !seqs.exists(_.nonEmpty)
  }

  val NoAcls: VersionedAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
  val WildcardHost: String = "*"

  // Orders by resource type, then resource pattern type and finally reverse ordering by name.
  class ResourceOrdering extends Ordering[ResourcePattern] {

    def compare(a: ResourcePattern, b: ResourcePattern): Int = {
      val rt = a.resourceType.compareTo(b.resourceType)
      if (rt != 0)
        rt
      else {
        val rnt = a.patternType.compareTo(b.patternType)
        if (rnt != 0)
          rnt
        else
          (a.name compare b.name) * -1
      }
    }
  }

  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
    val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG).
      map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
    if (!zkSslClientEnable)
      new ZKClientConfig
    else {
      // start with the base config from the Kafka configuration
      // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
      val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true)
      // add in any prefixed overlays
      ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) =>
        configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
          zkClientConfig.setProperty(sysProp,
            if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
              (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
            else
              prefixedValue.toString.trim)
        }
      }
      zkClientConfig
    }
  }

  private def validateAclBinding(aclBinding: AclBinding): Unit = {
    if (aclBinding.isUnknown)
      throw new IllegalArgumentException("ACL binding contains unknown elements")
    if (aclBinding.pattern().name().contains("/"))
      throw new IllegalArgumentException(s"ACL binding contains invalid resource name: ${aclBinding.pattern().name()}")
  }

  def loadAllAcls(
    zkClient: KafkaZkClient,
    logger: Logging,
    aclConsumer: (ResourcePattern, VersionedAcls) => Unit
  ): Unit = {
    ZkAclStore.stores.foreach { store =>
      val resourceTypes = zkClient.getResourceTypes(store.patternType)
      for (rType <- resourceTypes) {
        val resourceType = Try(SecurityUtils.resourceType(rType))
        resourceType match {
          case Success(resourceTypeObj) =>
            val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
            for (resourceName <- resourceNames) {
              val resource = new ResourcePattern(resourceTypeObj, resourceName, store.patternType)
              val versionedAcls = getAclsFromZk(zkClient, resource)
              aclConsumer.apply(resource, versionedAcls)
            }
          case Failure(_) => logger.warn(s"Ignoring unknown ResourceType: $rType")
        }
      }
    }
  }

  private def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): VersionedAcls = {
    zkClient.getVersionedAclsForResource(resource)
  }
}

class AclAuthorizer extends Authorizer with Logging {
  import kafka.security.authorizer.AclAuthorizer._

  private[security] val authorizerLogger = Logger("kafka.authorizer.logger")
  private var superUsers = Set.empty[KafkaPrincipal]
  private var shouldAllowEveryoneIfNoAclIsFound = false
  private var zkClient: KafkaZkClient = _
  private var aclChangeListeners: Iterable[AclChangeSubscription] = Iterable.empty
  private var extendedAclSupport: Boolean = _

  @volatile
  private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)

  @volatile
  private var resourceCache = new scala.collection.immutable.HashMap[ResourceTypeKey,
    scala.collection.immutable.HashSet[String]]()

  private val lock = new Object()

  // The maximum number of times we should try to update the resource acls in zookeeper before failing;
  // This should never occur, but is a safeguard just in case.
  protected[security] var maxUpdateRetries = 10

  private val retryBackoffMs = 100
  private val retryBackoffJitterMs = 50

  /**
   * Guaranteed to be called before any authorize call is made.
   */
  override def configure(javaConfigs: util.Map[String, _]): Unit = {
    val configs = javaConfigs.asScala
    val props = new java.util.Properties()
    configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }

    superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
      case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
    }.getOrElse(Set.empty[KafkaPrincipal])

    shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean)

    // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
    // means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
    // set).
    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
    val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect)
    val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
    val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
    val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)

    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
    val time = Time.SYSTEM
    // createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster
    // because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper)
    zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
      zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig,
      metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true)
    zkClient.createAclPaths()

    extendedAclSupport = kafkaConfig.interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)

    // Start change listeners first and then populate the cache so that there is no timing window
    // between loading cache and processing change notifications.
    startZkChangeListeners()
    loadCache()
  }

  override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
    serverInfo.endpoints.asScala.map { endpoint =>
      endpoint -> CompletableFuture.completedFuture[Void](null)
    }.toMap.asJava
  }

  override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
    actions.asScala.map(action => authorizeAction(requestContext, action)).asJava
  }

  override def createAcls(requestContext: AuthorizableRequestContext,
                          aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
    val results = new Array[AclCreateResult](aclBindings.size)
    val aclsToCreate = aclBindings.asScala.zipWithIndex.filter { case (aclBinding, i) =>
      try {
        if (!extendedAclSupport && aclBinding.pattern.patternType == PatternType.PREFIXED) {
          throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
            s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} of $IBP_2_0_IV1 or greater")
        }
        validateAclBinding(aclBinding)
        true
      } catch {
        case e: Throwable =>
          results(i) = new AclCreateResult(new InvalidRequestException("Failed to create ACL", apiException(e)))
          false
      }
    }.groupBy(_._1.pattern)

    if (aclsToCreate.nonEmpty) {
      lock synchronized {
        aclsToCreate.forKeyValue { (resource, aclsWithIndex) =>
          try {
            updateResourceAcls(resource) { currentAcls =>
              val newAcls = aclsWithIndex.map { case (acl, _) => new AclEntry(acl.entry) }
              currentAcls ++ newAcls
            }
            aclsWithIndex.foreach { case (_, index) => results(index) = AclCreateResult.SUCCESS }
          } catch {
            case e: Throwable =>
              aclsWithIndex.foreach { case (_, index) => results(index) = new AclCreateResult(apiException(e)) }
          }
        }
      }
    }
    results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
  }

  /**
   *
   * <b>Concurrent updates:</b>
   * <ul>
   *   <li>If ACLs are created using [[kafka.security.authorizer.AclAuthorizer#createAcls]] while a delete is in
   *   progress, these ACLs may or may not be considered for deletion depending on the order of updates.
   *   The returned [[org.apache.kafka.server.authorizer.AclDeleteResult]] indicates which ACLs were deleted.</li>
   *   <li>If the provided filters use resource pattern type
   *   [[org.apache.kafka.common.resource.PatternType#MATCH]] that needs to filter all resources to determine
   *   matching ACLs, only ACLs that have already been propagated to the broker processing the ACL update will be
   *   deleted. This may not include some ACLs that were persisted, but not yet propagated to all brokers. The
   *   returned [[org.apache.kafka.server.authorizer.AclDeleteResult]] indicates which ACLs were deleted.</li>
   *   <li>If the provided filters use other resource pattern types that perform a direct match, all matching ACLs
   *   from previously completed [[kafka.security.authorizer.AclAuthorizer#createAcls]] are guaranteed to be deleted.</li>
   * </ul>
   */
  override def deleteAcls(requestContext: AuthorizableRequestContext,
                          aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
    val deletedBindings = new mutable.HashMap[AclBinding, Int]()
    val deleteExceptions = new mutable.HashMap[AclBinding, ApiException]()
    val filters = aclBindingFilters.asScala.zipWithIndex
    lock synchronized {
      // Find all potentially matching resource patterns from the provided filters and ACL cache and apply the filters
      val resources = aclCache.keys ++ filters.map(_._1.patternFilter).filter(_.matchesAtMostOne).flatMap(filterToResources)
      val resourcesToUpdate = resources.map { resource =>
        val matchingFilters = filters.filter { case (filter, _) =>
          filter.patternFilter.matches(resource)
        }
        resource -> matchingFilters
      }.toMap.filter(_._2.nonEmpty)

      resourcesToUpdate.forKeyValue { (resource, matchingFilters) =>
        val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, Int]()
        try {
          updateResourceAcls(resource) { currentAcls =>
            val aclsToRemove = currentAcls.filter { acl =>
              matchingFilters.exists { case (filter, index) =>
                val matches = filter.entryFilter.matches(acl)
                if (matches) {
                  val binding = new AclBinding(resource, acl)
                  deletedBindings.getOrElseUpdate(binding, index)
                  resourceBindingsBeingDeleted.getOrElseUpdate(binding, index)
                }
                matches
              }
            }
            currentAcls -- aclsToRemove
          }
        } catch {
          case e: Exception =>
            resourceBindingsBeingDeleted.keys.foreach { binding =>
              deleteExceptions.getOrElseUpdate(binding, apiException(e))
            }
        }
      }
    }
    val deletedResult = deletedBindings.groupBy(_._2).map { case (k, bindings) =>
      k -> bindings.keys.map { binding => new AclBindingDeleteResult(binding, deleteExceptions.get(binding).orNull) }
    }
    (0 until aclBindingFilters.size).map { i =>
      new AclDeleteResult(deletedResult.getOrElse(i, Set.empty[AclBindingDeleteResult]).toSet.asJava)
    }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
  }

  override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
    val aclBindings = new util.ArrayList[AclBinding]()
    aclCache.forKeyValue { case (resource, versionedAcls) =>
      versionedAcls.acls.foreach { acl =>
        val binding = new AclBinding(resource, acl.ace)
        if (filter.matches(binding))
          aclBindings.add(binding)
      }
    }
    aclBindings
  }

  override def close(): Unit = {
    aclChangeListeners.foreach(listener => listener.close())
    if (zkClient != null) zkClient.close()
  }

  override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
                                       op: AclOperation,
                                       resourceType: ResourceType): AuthorizationResult = {
    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)

    val principal = new KafkaPrincipal(
      requestContext.principal().getPrincipalType,
      requestContext.principal().getName)

    if (isSuperUser(principal))
      return AuthorizationResult.ALLOWED

    val resourceSnapshot = resourceCache
    val principalStr = principal.toString
    val host = requestContext.clientAddress().getHostAddress
    val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true)

    val denyLiterals = matchingResources(
      resourceSnapshot, principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL)

    if (denyAll(denyLiterals)) {
      logAuditMessage(requestContext, action, authorized = false)
      return AuthorizationResult.DENIED
    }

    if (shouldAllowEveryoneIfNoAclIsFound) {
      logAuditMessage(requestContext, action, authorized = true)
      return AuthorizationResult.ALLOWED
    }

    val denyPrefixes = matchingResources(
      resourceSnapshot, principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED)

    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
      if (hasMatchingResources(resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
          || hasMatchingResources(resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
        logAuditMessage(requestContext, action, authorized = true)
        return AuthorizationResult.ALLOWED
      } else {
        logAuditMessage(requestContext, action, authorized = false)
        return AuthorizationResult.DENIED
      }
    }

    val allowLiterals = matchingResources(
      resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)
    val allowPrefixes = matchingResources(
      resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)

    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
      logAuditMessage(requestContext, action, authorized = true)
      return AuthorizationResult.ALLOWED
    }

    logAuditMessage(requestContext, action, authorized = false)
    AuthorizationResult.DENIED
  }

  private def matchingResources(resourceSnapshot: immutable.Map[ResourceTypeKey, immutable.Set[String]],
                                principal: String, host: String, op: AclOperation, permission: AclPermissionType,
                                resourceType: ResourceType, patternType: PatternType): ArrayBuffer[Set[String]] = {
    val matched = ArrayBuffer[immutable.Set[String]]()
    for (p <- Set(principal, AclEntry.WILDCARD_PRINCIPAL_STRING);
         h <- Set(host, AclEntry.WILDCARD_HOST);
         o <- Set(op, AclOperation.ALL)) {
      val resourceTypeKey = ResourceTypeKey(
        new AccessControlEntry(p, h, o, permission), resourceType, patternType)
      resourceSnapshot.get(resourceTypeKey) match {
        case Some(resources) => matched += resources
        case None =>
      }
    }
    matched
  }

  private def hasMatchingResources(resourceSnapshot: immutable.Map[ResourceTypeKey, immutable.Set[String]],
                                   principal: String, host: String, op: AclOperation, permission: AclPermissionType,
                                   resourceType: ResourceType, patternType: PatternType): Boolean = {
    for (p <- Set(principal, AclEntry.WILDCARD_PRINCIPAL_STRING);
         h <- Set(host, AclEntry.WILDCARD_HOST);
         o <- Set(op, AclOperation.ALL)) {
          val resourceTypeKey = ResourceTypeKey(
            new AccessControlEntry(p, h, o, permission), resourceType, patternType)
          if (resourceSnapshot.contains(resourceTypeKey))
            return true
    }
    false
  }

  private def denyAll(denyLiterals: ArrayBuffer[immutable.Set[String]]): Boolean =
    denyLiterals.exists(_.contains(ResourcePattern.WILDCARD_RESOURCE))


  private def allowAny(allowLiterals: ArrayBuffer[immutable.Set[String]], allowPrefixes: ArrayBuffer[immutable.Set[String]],
                       denyLiterals: ArrayBuffer[immutable.Set[String]], denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
    (allowPrefixes.exists(_.exists(prefix => allowPrefix(prefix, denyPrefixes)))
      || allowLiterals.exists(_.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes))))
  }

  private def allowLiteral(literalName: String, denyLiterals: ArrayBuffer[immutable.Set[String]],
                           denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
    literalName match {
      case ResourcePattern.WILDCARD_RESOURCE => true
      case _ => !denyLiterals.exists(_.contains(literalName)) && !hasDominantPrefixedDeny(literalName, denyPrefixes)
    }
  }

  private def allowPrefix(prefixName: String,
                          denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
    !hasDominantPrefixedDeny(prefixName, denyPrefixes)
  }

  private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
    val sb = new StringBuilder
    for (ch <- resourceName.toCharArray) {
      sb.append(ch)
      if (denyPrefixes.exists(p => p.contains(sb.toString()))) {
        return true
      }
    }
    false
  }

  private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
    val resource = action.resourcePattern
    if (resource.patternType != PatternType.LITERAL) {
      throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.patternType)
    }

    // ensure we compare identical classes
    val sessionPrincipal = requestContext.principal
    val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
      new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
    else
      sessionPrincipal

    val host = requestContext.clientAddress.getHostAddress
    val operation = action.operation

    def isEmptyAclAndAuthorized(acls: AclSeqs): Boolean = {
      if (acls.isEmpty) {
        // No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
        authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
        shouldAllowEveryoneIfNoAclIsFound
      } else false
    }

    def denyAclExists(acls: AclSeqs): Boolean = {
      // Check if there are any Deny ACLs which would forbid this operation.
      matchingAclExists(operation, resource, principal, host, DENY, acls)
    }

    def allowAclExists(acls: AclSeqs): Boolean = {
      // Check if there are any Allow ACLs which would allow this operation.
      // Allowing read, write, delete, or alter implies allowing describe.
      // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
      val allowOps = operation match {
        case DESCRIBE => Set[AclOperation](DESCRIBE, READ, WRITE, DELETE, ALTER)
        case DESCRIBE_CONFIGS => Set[AclOperation](DESCRIBE_CONFIGS, ALTER_CONFIGS)
        case _ => Set[AclOperation](operation)
      }
      allowOps.exists(operation => matchingAclExists(operation, resource, principal, host, ALLOW, acls))
    }

    def aclsAllowAccess = {
      // we allow an operation if no acls are found and user has configured to allow all users
      // when no acls are found or if no deny acls are found and at least one allow acls matches.
      val acls = matchingAcls(resource.resourceType, resource.name)
      isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && allowAclExists(acls))
    }

    // Evaluate if operation is allowed
    val authorized = isSuperUser(principal) || aclsAllowAccess

    logAuditMessage(requestContext, action, authorized)
    if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
  }

  private def isSuperUser(principal: KafkaPrincipal): Boolean = {
    if (superUsers.contains(principal)) {
      authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")
      true
    } else false
  }

  @nowarn("cat=deprecation")
  private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
    // this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes

    // save aclCache reference to a local val to get a consistent view of the cache during acl updates.
    val aclCacheSnapshot = aclCache
    val wildcard = aclCacheSnapshot.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL))
      .map(_.acls.toBuffer)
      .getOrElse(mutable.Buffer.empty)

    val literal = aclCacheSnapshot.get(new ResourcePattern(resourceType, resourceName, PatternType.LITERAL))
      .map(_.acls.toBuffer)
      .getOrElse(mutable.Buffer.empty)

    val prefixed = new ArrayBuffer[AclEntry]
    aclCacheSnapshot
      .from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED))
      .to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED))
      .forKeyValue { (resource, acls) =>
        if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls
      }

    new AclSeqs(prefixed, wildcard, literal)
  }

  private def matchingAclExists(operation: AclOperation,
                                resource: ResourcePattern,
                                principal: KafkaPrincipal,
                                host: String,
                                permissionType: AclPermissionType,
                                acls: AclSeqs): Boolean = {
    acls.find { acl =>
      acl.permissionType == permissionType &&
        (acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WILDCARD_PRINCIPAL) &&
        (operation == acl.operation || acl.operation == AclOperation.ALL) &&
        (acl.host == host || acl.host == AclEntry.WILDCARD_HOST)
    }.exists { acl =>
      authorizerLogger.debug(s"operation = $operation on resource = $resource from host = $host is $permissionType based on acl = $acl")
      true
    }
  }

  private def loadCache(): Unit = {
    lock synchronized  {
      loadAllAcls(zkClient, this, updateCache)
    }
  }

  private[authorizer] def startZkChangeListeners(): Unit = {
    aclChangeListeners = ZkAclChangeStore.stores
      .map(store => store.createListener(AclChangedNotificationHandler, zkClient))
  }

  private def filterToResources(filter: ResourcePatternFilter): Set[ResourcePattern] = {
    filter.patternType match {
      case PatternType.LITERAL | PatternType.PREFIXED =>
        Set(new ResourcePattern(filter.resourceType, filter.name, filter.patternType))
      case PatternType.ANY =>
        Set(new ResourcePattern(filter.resourceType, filter.name, PatternType.LITERAL),
          new ResourcePattern(filter.resourceType, filter.name, PatternType.PREFIXED))
      case _ => throw new IllegalArgumentException(s"Cannot determine matching resources for patternType $filter")
    }
  }

  private def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
    def logMessage: String = {
      val principal = requestContext.principal
      val operation = SecurityUtils.operationName(action.operation)
      val host = requestContext.clientAddress.getHostAddress
      val resourceType = SecurityUtils.resourceTypeName(action.resourcePattern.resourceType)
      val resource = s"$resourceType$RESOURCE_SEPARATOR${action.resourcePattern.patternType}$RESOURCE_SEPARATOR${action.resourcePattern.name}"
      val authResult = if (authorized) "Allowed" else "Denied"
      val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
      val refCount = action.resourceReferenceCount

      s"Principal = $principal is $authResult Operation = $operation " +
        s"from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
    }

    if (authorized) {
      // logIfAllowed is true if access is granted to the resource as a result of this authorization.
      // In this case, log at debug level. If false, no access is actually granted, the result is used
      // only to determine authorized operations. So log only at trace level.
      if (action.logIfAllowed)
        authorizerLogger.debug(logMessage)
      else
        authorizerLogger.trace(logMessage)
    } else {
      // logIfDenied is true if access to the resource was explicitly requested. Since this is an attempt
      // to access unauthorized resources, log at info level. If false, this is either a request to determine
      // authorized operations or a filter (e.g for regex subscriptions) to filter out authorized resources.
      // In this case, log only at trace level.
      if (action.logIfDenied)
        authorizerLogger.info(logMessage)
      else
        authorizerLogger.trace(logMessage)
    }
  }

  /**
    * Safely updates the resources ACLs by ensuring reads and writes respect the expected zookeeper version.
    * Continues to retry until it successfully updates zookeeper.
    *
    * Returns a boolean indicating if the content of the ACLs was actually changed.
    *
    * @param resource the resource to change ACLs for
    * @param getNewAcls function to transform existing acls to new ACLs
    * @return boolean indicating if a change was made
    */
  private def updateResourceAcls(resource: ResourcePattern)(getNewAcls: Set[AclEntry] => Set[AclEntry]): Boolean = {
    var currentVersionedAcls =
      if (aclCache.contains(resource))
        getAclsFromCache(resource)
      else
        getAclsFromZk(zkClient, resource)
    var newVersionedAcls: VersionedAcls = null
    var writeComplete = false
    var retries = 0
    while (!writeComplete && retries <= maxUpdateRetries) {
      val newAcls = getNewAcls(currentVersionedAcls.acls)
      val (updateSucceeded, updateVersion) =
        if (newAcls.nonEmpty) {
          if (currentVersionedAcls.exists)
            zkClient.conditionalSetAclsForResource(resource, newAcls, currentVersionedAcls.zkVersion)
          else
            zkClient.createAclsForResourceIfNotExists(resource, newAcls)
        } else {
          trace(s"Deleting path for $resource because it had no ACLs remaining")
          (zkClient.conditionalDelete(resource, currentVersionedAcls.zkVersion), 0)
        }

      if (!updateSucceeded) {
        trace(s"Failed to update ACLs for $resource. Used version ${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
        Thread.sleep(backoffTime)
        currentVersionedAcls = getAclsFromZk(zkClient, resource)
        retries += 1
      } else {
        newVersionedAcls = VersionedAcls(newAcls, updateVersion)
        writeComplete = updateSucceeded
      }
    }

    if (!writeComplete)
      throw new IllegalStateException(s"Failed to update ACLs for $resource after trying a maximum of $maxUpdateRetries times")

    if (newVersionedAcls.acls != currentVersionedAcls.acls) {
      info(s"Updated ACLs for $resource with new version ${newVersionedAcls.zkVersion}")
      debug(s"Updated ACLs for $resource to $newVersionedAcls")
      updateCache(resource, newVersionedAcls)
      updateAclChangedFlag(resource)
      true
    } else {
      debug(s"Updated ACLs for $resource, no change was made")
      updateCache(resource, newVersionedAcls) // Even if no change, update the version
      false
    }
  }

  private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
    aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
  }


  // Visible for benchmark
  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
    val currentAces: Set[AccessControlEntry] = aclCache.get(resource).map(_.acls.map(_.ace)).getOrElse(Set.empty)
    val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => aclEntry.ace)
    val acesToAdd = newAces.diff(currentAces)
    val acesToRemove = currentAces.diff(newAces)

    acesToAdd.foreach { ace =>
      val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(), resource.patternType())
      resourceCache.get(resourceTypeKey) match {
        case Some(resources) => resourceCache += (resourceTypeKey -> (resources + resource.name()))
        case None => resourceCache += (resourceTypeKey -> immutable.HashSet(resource.name()))
      }
    }
    acesToRemove.foreach { ace =>
      val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(), resource.patternType())
      resourceCache.get(resourceTypeKey) match {
        case Some(resources) =>
          val newResources = resources - resource.name()
          if (newResources.isEmpty) {
            resourceCache -= resourceTypeKey
          } else {
            resourceCache += (resourceTypeKey -> newResources)
          }
        case None =>
      }
    }

    if (versionedAcls.acls.nonEmpty) {
      aclCache = aclCache.updated(resource, versionedAcls)
    } else {
      aclCache -= resource
    }
  }

  private def updateAclChangedFlag(resource: ResourcePattern): Unit = {
    zkClient.createAclChangeNotification(resource)
  }

  private def backoffTime = {
    retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
  }

  private def apiException(e: Throwable): ApiException = {
    e match {
      case e1: ApiException => e1
      case e1 => new ApiException(e1)
    }
  }

  private[authorizer] def processAclChangeNotification(resource: ResourcePattern): Unit = {
    lock synchronized {
      val versionedAcls = getAclsFromZk(zkClient, resource)
      info(s"Processing Acl change notification for $resource, versionedAcls : ${versionedAcls.acls}, zkVersion : ${versionedAcls.zkVersion}")
      updateCache(resource, versionedAcls)
    }
  }

  private object AclChangedNotificationHandler extends AclChangeNotificationHandler {
    override def processNotification(resource: ResourcePattern): Unit = {
      processAclChangeNotification(resource)
    }
  }

  private case class ResourceTypeKey(ace: AccessControlEntry,
                                     resourceType: ResourceType,
                                     patternType: PatternType)
}
